
//
//    /**
//     * 写入数据库
//     *
//     * @param db 数据库名
//     * @param retention 保存策略
//     * @param measurement 表
//     * @param time 时间
//     * @param tags 标签
//     * @param fields 值
//     */
//    public static void write(String db, String retention, String measurement, long time, Map<String, String> tags,
//        Map<String, Object> fields) {
//
//        try {
//            // 保存到influxdb
//            Point point = Point.measurement(measurement).time(time, TimeUnit.MILLISECONDS).tag(tags).fields(fields)
//                .build();
//
//            InfluxDBClient.getInstance().write(db, retention, point);
//        }
//        catch (Exception e) {
//            logger.error(e);
//        }
//
//    }
//
//    /**
//     * 写入数据库
//     *
//     * @param db 数据库名
//     * @param retention 保存策略
//     * @param measurement 表
//     * @param time 时间
//     * @param timeUnit 时间类型
//     * @param tags 标签
//     * @param fields 值
//     */
//    public static void write(String db, String retention, String measurement, long time, TimeUnit timeUnit,
//        Map<String, String> tags, Map<String, Object> fields) {
//
//        try {
//            // 保存到influxdb
//            Point point = Point.measurement(measurement).time(time, timeUnit).tag(tags).fields(fields).build();
//
//            InfluxDBClient.getInstance().write(db, retention, point);
//        }
//        catch (Exception e) {
//            logger.error(e);
//        }
//
//    }
//

package com.ztesoft.zsmart.zcm.monitor.utils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import com.google.common.base.Preconditions;
import com.ztesoft.zsmart.core.spring.SpringContext;
import org.apache.commons.collections.CollectionUtils;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;

import com.ztesoft.zsmart.core.log.ZSmartLogger;
import com.ztesoft.zsmart.core.utils.StringUtil;
import com.ztesoft.zsmart.zcm.monitor.config.MonitorProperties;

import okhttp3.OkHttpClient;

/**
 * Created by lkj on 2018/4/09.
 */
public final class InfluxDBUtil {

    private static final int INFLUXDB_READ_POOL_SIZE = SpringContext.getBean(MonitorProperties.class)
        .getInfluxdbReadPoolSize();

    private static final int INFLUXDB_READ_TIMEOUT_SECONDS = SpringContext.getBean(MonitorProperties.class)
        .getInfluxdbReadTimeoutSeconds();

    private static final AtomicInteger INFLUXDB_READ_USING_POOL_SIZE = new AtomicInteger(0);

    /**
     * 读取模拟连接池
     */
    private static BlockingQueue<InfluxDB> influxDBReadPool = new LinkedBlockingQueue<>(INFLUXDB_READ_POOL_SIZE);

    private static final ZSmartLogger logger = ZSmartLogger.getLogger(InfluxDBUtil.class);

    private static final MonitorProperties imonitorProperties = SpringContextUtil.getBean(MonitorProperties.class);

    private static final int DEFAULT_BATCH_SIZE_CACHE = 100;

    private static final int DEFAULT_BATCH_PERIOD = 5;

    public static final String K2H_RAW = "k2h_raw";

    public static final String K7D_S5M = "k7d_s5m";

    public static final String K90D_S30M = "k90d_s30m";

    public static final String HOST_METRICS = "host_metrics";

    public static final String PROMETHEUS_METRICS = "prometheus_metrics";

    public static final String APPLICATION_METRICS = "application_metrics";

    public static final String MW_METRICS = "mw_metrics";

    public static final String K8S_METRICS = "k8s_metrics";

    public static final String BSS_METRICS = "bss_metrics";

    public static OkHttpClient.Builder client = initOkHttpClient();

    /**
     * 初始化读连接池
     */
    static {
        initInfluxdbReadPool();
    }

    private InfluxDBUtil() {
    }

    /**
     * 单例模式
     */
    private static InfluxDB influxDB;

    /**
     * 获取influxDB
     *
     * @return InfluxDB
     */
    public static InfluxDB getInfluxDB() {
        if (influxDB == null) {
            synchronized (InfluxDBUtil.class) {
                if (null != influxDB) {
                    return influxDB;
                }
                String influxDBUrl = imonitorProperties.getInfluxdbUrl();
                String influxDBUser = imonitorProperties.getInfluxdbUser();
                String influxDBPassword = imonitorProperties.getInfluxdbPassword();
                logger.info("Influxdb url is [{}], access user is [{}]", influxDBUrl, influxDBUser);
                if (StringUtil.isNotEmpty(influxDBUser) && StringUtil.isNotEmpty(influxDBPassword)) {
                    influxDB = InfluxDBFactory.connect(influxDBUrl, influxDBUser, influxDBPassword, client);
                }
                else {
                    influxDB = InfluxDBFactory.connect(influxDBUrl, client);
                }
                // 改为批量模式
                influxDB.enableBatch(DEFAULT_BATCH_SIZE_CACHE, DEFAULT_BATCH_PERIOD, TimeUnit.SECONDS)
                    .enablePointTypeJson();
            }
        }
        return influxDB;
    }

    private static void initInfluxdbReadPool() {
        logger.info("init influxdb read pool......");
        influxDBReadPool.clear();
        INFLUXDB_READ_USING_POOL_SIZE.set(0);
        MonitorProperties imonitorProperties = SpringContextUtil.getBean(MonitorProperties.class);
        String influxDBUrl = imonitorProperties.getInfluxdbUrlRead();
        String influxDBUser = imonitorProperties.getInfluxdbUser();
        String influxDBPassword = imonitorProperties.getInfluxdbPassword();
        logger.info("Influxdb url read is [{}], access user is [{}]", influxDBUrl, influxDBUser);
        for (int i = 0; i < INFLUXDB_READ_POOL_SIZE; i++) {

            if (StringUtil.isNotEmpty(influxDBUser) && StringUtil.isNotEmpty(influxDBPassword)) {
                putDbToReadPool(InfluxDBFactory.connect(influxDBUrl, influxDBUser, influxDBPassword, client));
            }
            else {
                putDbToReadPool(InfluxDBFactory.connect(influxDBUrl, client));
            }
        }
    }

    /**
     * 获取influxDB
     *
     * @return InfluxDB
     */
    private static InfluxDB getInfluxDBRead() {
        InfluxDB db = null;
        try {
            db = influxDBReadPool.poll(INFLUXDB_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS);
            INFLUXDB_READ_USING_POOL_SIZE.getAndDecrement();
            Preconditions.checkNotNull(db, "get db null");
        }
        catch (Exception e) {
            logger.error("get influxdb from pool time out:", e);
            if (INFLUXDB_READ_USING_POOL_SIZE.get() <= 0) {
                initInfluxdbReadPool();
                try {
                    db = influxDBReadPool.poll(INFLUXDB_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS);
                    INFLUXDB_READ_USING_POOL_SIZE.getAndDecrement();
                }
                catch (Exception e1) {
                    logger.error("get db error", e);
                }

            }
        }
        return db;
    }

    /**
     * 批量写
     *
     * @param batchPoints BatchPoints
     */
    public static void batchWrite(BatchPoints batchPoints) {
        getInfluxDB().write(batchPoints);
    }

    /**
     * 单点写
     *
     * @param db String
     * @param retention String
     * @param point Point
     */
    public static void write(String db, String retention, Point point) {
        getInfluxDB().write(db, retention, point);
    }

    /**
     * 查询数据
     *
     * @param db 数据库名
     * @param command 查询命令
     * @return QueryResult
     */
    public static QueryResult query(String db, String command) {
        Query query = new Query(command, db);
        InfluxDB influxDBRead = null;
        QueryResult result = null;
        result = doQuery(query, influxDBRead, result);

        return result;
    }

    private static QueryResult doQuery(Query query, InfluxDB influxDBRead, QueryResult result) {
        try {
            influxDBRead = getInfluxDBRead();
            if (influxDBRead != null) {
                result = influxDBRead.query(query);
            }
            else {
                logger.error("not get influxdb...");
            }
        }
        catch (Exception e) {
            logger.error("qry influxdb error:", e);
        }
        finally {
            if (influxDBRead != null) {
                putDbToReadPool(influxDBRead);
            }
        }
        return result;
    }

    /**
     * 读连接放入队列中
     *
     * @param db
     */
    private static void putDbToReadPool(InfluxDB db) {
        influxDBReadPool.offer(db);
        INFLUXDB_READ_USING_POOL_SIZE.getAndIncrement();
    }

    /**
     * 查询influxdb
     *
     * @param query Query
     * @return QueryResult
     */
    public static QueryResult query(Query query) {
        InfluxDB influxDBRead = null;
        QueryResult result = null;
        result = doQuery(query, influxDBRead, result);
        return result;
    }

    /**
     * 查询
     *
     * @param query Query
     * @param chunkSize int
     * @param consumer Consumer<QueryResult>
     */
    public static void query(Query query, int chunkSize, final Consumer<QueryResult> consumer) {
        getInfluxDBRead().query(query, chunkSize, consumer);
    }

    public static List<Map<String, Object>> getResults(String sql, String dbName) {
        List<Map<String, Object>> results = new ArrayList<Map<String, Object>>();
        QueryResult queryResult = InfluxDBUtil.query(dbName, sql);
        if (queryResult != null) {
            List<QueryResult.Result> resultList = queryResult.getResults();
            if (CollectionUtils.isNotEmpty(resultList)) {
                QueryResult.Result result = resultList.get(0);
                List<QueryResult.Series> seriesList = result.getSeries();
                dealListSeries(results, seriesList);
            }
        }
        return results;
    }

    private static void dealListSeries(List<Map<String, Object>> results, List<QueryResult.Series> seriesList) {
        if (CollectionUtils.isNotEmpty(seriesList)) {
            for (QueryResult.Series series : seriesList) {
                List<List<Object>> valueList = series.getValues();
                if (CollectionUtils.isNotEmpty(valueList)) {
                    for (List<Object> objList : valueList) {
                        Map<String, Object> m = new HashMap<String, Object>();
                        m.put("tag", series.getTags());
                        if (objList != null && !objList.isEmpty()) {
                            for (int j = 0; j < objList.size(); j++) {
                                m.put("val" + j, objList.get(j));
                            }
                            results.add(m);
                        }
                    }
                }
            }
        }
    }

    public static Map<String, Object> getResult(String sql, String dbName) {
        Map<String, Object> usageMap = new HashMap<String, Object>();
        QueryResult queryResult = InfluxDBUtil.query(dbName, sql);
        if (queryResult != null) {
            List<QueryResult.Result> resultList = queryResult.getResults();
            if (CollectionUtils.isNotEmpty(resultList)) {
                QueryResult.Result result = resultList.get(0);
                List<QueryResult.Series> seriesList = result.getSeries();
                dealSeries(usageMap, seriesList);
            }
        }
        return usageMap;
    }

    private static void dealSeries(Map<String, Object> usageMap, List<QueryResult.Series> seriesList) {
        if (CollectionUtils.isNotEmpty(seriesList)) {
            for (QueryResult.Series series : seriesList) {
                List<List<Object>> valueList = series.getValues();
                usageMap.put("tag", series.getTags());
                if (CollectionUtils.isNotEmpty(valueList)) {
                    List<Object> objList = valueList.get(0);
                    if (objList != null && !objList.isEmpty()) {
                        if (CollectionUtils.isNotEmpty(objList)) {
                            for (int j = 0; j < objList.size(); j++) {
                                usageMap.put("val" + j, objList.get(j));
                            }
                        }
                    }
                }
            }
        }
    }

    /**
     * 动态配置influxdb里连接超时和读取超时 单号：2189670 createdTime:20200909
     *
     * @return
     */
    private static OkHttpClient.Builder initOkHttpClient() {
        long influxdbReadTimeOut = imonitorProperties.getInfluxdbReadTimeOut();
        long influxdbConnectTimeOut = imonitorProperties.getInfluxdbConnectTimeOut();
        if (influxdbReadTimeOut < 10L || influxdbConnectTimeOut < 10L) {
            influxdbReadTimeOut = 10L;
            influxdbConnectTimeOut = 10L;
        }
        client = new OkHttpClient.Builder().readTimeout(influxdbReadTimeOut, TimeUnit.SECONDS)
            .connectTimeout(influxdbConnectTimeOut, TimeUnit.SECONDS);
        return client;
    }

    public static void main(String[] args) {
        /**
         * InfluxDB influxdb = InfluxDBFactory.connect("http://172.16.17.82:8586", "admin", "abc@123A"); Query query =
         * new Query("select * from alarm_event limit 100", "alarm"); QueryResult result = influxdb.query(query);
         */
    }
}
